/*
 * Licensed to the Apache Software Foundation (ASF) under one
 * or more contributor license agreements.  See the NOTICE file
 * distributed with this work for additional information
 * regarding copyright ownership.  The ASF licenses this file
 * to you under the Apache License, Version 2.0 (the
 * "License"); you may not use this file except in compliance
 * with the License.  You may obtain a copy of the License at
 *
 *   http://www.apache.org/licenses/LICENSE-2.0
 *
 * Unless required by applicable law or agreed to in writing,
 * software distributed under the License is distributed on an
 * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
 * KIND, either express or implied.  See the License for the
 * specific language governing permissions and limitations
 * under the License.
 */

package org.apache.druid.indexer;

import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.jsontype.NamedType;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Lists;
import org.apache.commons.io.FileUtils;
import org.apache.druid.data.input.impl.CSVParseSpec;
import org.apache.druid.data.input.impl.DimensionsSpec;
import org.apache.druid.data.input.impl.InputRowParser;
import org.apache.druid.data.input.impl.JSONParseSpec;
import org.apache.druid.data.input.impl.StringInputRowParser;
import org.apache.druid.data.input.impl.TimestampSpec;
import org.apache.druid.indexer.granularity.UniformGranularitySpec;
import org.apache.druid.java.util.common.Intervals;
import org.apache.druid.java.util.common.RE;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.druid.java.util.common.granularity.Granularities;
import org.apache.druid.query.aggregation.AggregatorFactory;
import org.apache.druid.query.aggregation.CountAggregatorFactory;
import org.apache.druid.query.aggregation.LongSumAggregatorFactory;
import org.apache.druid.query.aggregation.hyperloglog.HyperUniquesAggregatorFactory;
import org.apache.druid.segment.indexing.DataSchema;
import org.apache.druid.timeline.DataSegment;
import org.apache.druid.timeline.partition.HashBasedNumberedShardSpec;
import org.apache.druid.timeline.partition.HashPartitionFunction;
import org.apache.druid.timeline.partition.NumberedShardSpec;
import org.apache.druid.timeline.partition.ShardSpec;
import org.apache.druid.timeline.partition.SingleDimensionShardSpec;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.LocalFileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.BytesWritable;
import org.apache.hadoop.io.SequenceFile;
import org.apache.hadoop.io.SequenceFile.Writer;
import org.apache.hadoop.io.compress.CompressionCodec;
import org.apache.hadoop.mapreduce.MRJobConfig;
import org.apache.hadoop.mapreduce.lib.input.SequenceFileInputFormat;
import org.joda.time.DateTime;
import org.joda.time.DateTimeComparator;
import org.joda.time.Interval;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

import java.io.File;
import java.io.IOException;
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Map.Entry;
import java.util.TreeMap;

@RunWith(Parameterized.class)
public class IndexGeneratorJobTest
{
  private static final AggregatorFactory[] AGGS1 = {
      new LongSumAggregatorFactory("visited_num", "visited_num"),
      new HyperUniquesAggregatorFactory("unique_hosts", "host")
  };

  private static final AggregatorFactory[] AGGS2 = {
      new CountAggregatorFactory("count")
  };

  @Parameterized.Parameters(name = "useCombiner={0}, partitionType={1}, interval={2}, shardInfoForEachSegment={3}, " +
                                   "data={4}, inputFormatName={5}, inputRowParser={6}, maxRowsInMemory={7}, " +
                                   "maxBytesInMemory={8}, aggs={9}, datasourceName={10}, forceExtendableShardSpecs={11}")
  public static Collection<Object[]> constructFeed()
  {
    final Object[][] baseConstructors = new Object[][]{
        {
            false,
            "single",
            "2014-10-22T00:00:00Z/P2D",
            new String[][][]{
                {
                    {null, "c.example.com"},
                    {"c.example.com", "e.example.com"},
                    {"e.example.com", "g.example.com"},
                    {"g.example.com", "i.example.com"},
                    {"i.example.com", null}
                },
                {
                    {null, "c.example.com"},
                    {"c.example.com", "e.example.com"},
                    {"e.example.com", "g.example.com"},
                    {"g.example.com", "i.example.com"},
                    {"i.example.com", null}
                }
            },
            ImmutableList.of(
                "2014102200,a.example.com,100",
                "2014102200,b.exmaple.com,50",
                "2014102200,c.example.com,200",
                "2014102200,d.example.com,250",
                "2014102200,e.example.com,123",
                "2014102200,f.example.com,567",
                "2014102200,g.example.com,11",
                "2014102200,h.example.com,251",
                "2014102200,i.example.com,963",
                "2014102200,j.example.com,333",
                "2014102300,a.example.com,100",
                "2014102300,b.exmaple.com,50",
                "2014102300,c.example.com,200",
                "2014102300,d.example.com,250",
                "2014102300,e.example.com,123",
                "2014102300,f.example.com,567",
                "2014102300,g.example.com,11",
                "2014102300,h.example.com,251",
                "2014102300,i.example.com,963",
                "2014102300,j.example.com,333"
            ),
            null,
            new StringInputRowParser(
                new CSVParseSpec(
                    new TimestampSpec("timestamp", "yyyyMMddHH", null),
                    new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host"))),
                    null,
                    ImmutableList.of("timestamp", "host", "visited_num"),
                    false,
                    0
                ),
                null
            ),
            null,
            null,
            AGGS1,
            "website"
        },
        {
            false,
            "hashed",
            "2014-10-22T00:00:00Z/P1D",
            new Integer[][][]{
                {
                    {0, 4},
                    {1, 4},
                    {2, 4},
                    {3, 4}
                }
            },
            ImmutableList.of(
                "2014102200,a.example.com,100",
                "2014102201,b.exmaple.com,50",
                "2014102202,c.example.com,200",
                "2014102203,d.example.com,250",
                "2014102204,e.example.com,123",
                "2014102205,f.example.com,567",
                "2014102206,g.example.com,11",
                "2014102207,h.example.com,251",
                "2014102208,i.example.com,963",
                "2014102209,j.example.com,333",
                "2014102210,k.example.com,253",
                "2014102211,l.example.com,321",
                "2014102212,m.example.com,3125",
                "2014102213,n.example.com,234",
                "2014102214,o.example.com,325",
                "2014102215,p.example.com,3533",
                "2014102216,q.example.com,500",
                "2014102216,q.example.com,87"
            ),
            null,
            new HadoopyStringInputRowParser(
                new CSVParseSpec(
                    new TimestampSpec("timestamp", "yyyyMMddHH", null),
                    new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host"))),
                    null,
                    ImmutableList.of("timestamp", "host", "visited_num"),
                    false,
                    0
                )
            ),
            null,
            null,
            AGGS1,
            "website"
        },
        {
            true,
            "hashed",
            "2014-10-22T00:00:00Z/P1D",
            new Integer[][][]{
                {
                    {0, 4},
                    {1, 4},
                    {2, 4},
                    {3, 4}
                }
            },
            ImmutableList.of(
                "2014102200,a.example.com,100",
                "2014102201,b.exmaple.com,50",
                "2014102202,c.example.com,200",
                "2014102203,d.example.com,250",
                "2014102204,e.example.com,123",
                "2014102205,f.example.com,567",
                "2014102206,g.example.com,11",
                "2014102207,h.example.com,251",
                "2014102208,i.example.com,963",
                "2014102209,j.example.com,333",
                "2014102210,k.example.com,253",
                "2014102211,l.example.com,321",
                "2014102212,m.example.com,3125",
                "2014102213,n.example.com,234",
                "2014102214,o.example.com,325",
                "2014102215,p.example.com,3533",
                "2014102216,q.example.com,500",
                "2014102216,q.example.com,87"
            ),
            null,
            new StringInputRowParser(
                new CSVParseSpec(
                    new TimestampSpec("timestamp", "yyyyMMddHH", null),
                    new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host"))),
                    null,
                    ImmutableList.of("timestamp", "host", "visited_num"),
                    false,
                    0
                ),
                null
            ),
            null,
            null,
            AGGS1,
            "website"
        },
        {
            false,
            "single",
            "2014-10-22T00:00:00Z/P2D",
            new String[][][]{
                {
                    {null, "c.example.com"},
                    {"c.example.com", "e.example.com"},
                    {"e.example.com", "g.example.com"},
                    {"g.example.com", "i.example.com"},
                    {"i.example.com", null}
                },
                {
                    {null, "c.example.com"},
                    {"c.example.com", "e.example.com"},
                    {"e.example.com", "g.example.com"},
                    {"g.example.com", "i.example.com"},
                    {"i.example.com", null}
                }
            },
            ImmutableList.of(
                "2014102200,a.example.com,100",
                "2014102200,b.exmaple.com,50",
                "2014102200,c.example.com,200",
                "2014102200,d.example.com,250",
                "2014102200,e.example.com,123",
                "2014102200,f.example.com,567",
                "2014102200,g.example.com,11",
                "2014102200,h.example.com,251",
                "2014102200,i.example.com,963",
                "2014102200,j.example.com,333",
                "2014102300,a.example.com,100",
                "2014102300,b.exmaple.com,50",
                "2014102300,c.example.com,200",
                "2014102300,d.example.com,250",
                "2014102300,e.example.com,123",
                "2014102300,f.example.com,567",
                "2014102300,g.example.com,11",
                "2014102300,h.example.com,251",
                "2014102300,i.example.com,963",
                "2014102300,j.example.com,333"
            ),
            SequenceFileInputFormat.class.getName(),
            new HadoopyStringInputRowParser(
                new CSVParseSpec(
                    new TimestampSpec("timestamp", "yyyyMMddHH", null),
                    new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of("host"))),
                    null,
                    ImmutableList.of("timestamp", "host", "visited_num"),
                    false,
                    0
                )
            ),
            null,
            null,
            AGGS1,
            "website"
        },
        {
            // Tests that new indexes inherit the dimension order from previous index
            false,
            "hashed",
            "2014-10-22T00:00:00Z/P1D",
            new Integer[][][]{
                {
                    {0, 1} // use a single partition, dimension order inheritance is not supported across partitions
                }
            },
            ImmutableList.of(
                "{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
                "{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
                "{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
                "{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
                "{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
                "{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
            ),
            null,
            new StringInputRowParser(
                new JSONParseSpec(
                    new TimestampSpec("ts", "yyyyMMddHH", null),
                    DimensionsSpec.EMPTY,
                    null,
                    null,
                    null
                ),
                null
            ),
            1, // force 1 row max per index for easier testing
            null,
            AGGS2,
            "inherit_dims"
        },
        {
            // Tests that pre-specified dim order is maintained across indexes.
            false,
            "hashed",
            "2014-10-22T00:00:00Z/P1D",
            new Integer[][][]{
                {
                    {0, 1}
                }
            },
            ImmutableList.of(
                "{\"ts\":\"2014102200\", \"X\":\"x.example.com\"}",
                "{\"ts\":\"2014102201\", \"Y\":\"y.example.com\"}",
                "{\"ts\":\"2014102202\", \"M\":\"m.example.com\"}",
                "{\"ts\":\"2014102203\", \"Q\":\"q.example.com\"}",
                "{\"ts\":\"2014102204\", \"B\":\"b.example.com\"}",
                "{\"ts\":\"2014102205\", \"F\":\"f.example.com\"}"
            ),
            null,
            new StringInputRowParser(
                new JSONParseSpec(
                    new TimestampSpec("ts", "yyyyMMddHH", null),
                    new DimensionsSpec(DimensionsSpec.getDefaultSchemas(ImmutableList.of(
                        "B",
                        "F",
                        "M",
                        "Q",
                        "X",
                        "Y"
                    ))),
                    null,
                    null,
                    null
                ),
                null
            ),
            1, // force 1 row max per index for easier testing
            null,
            AGGS2,
            "inherit_dims2"
        }
    };

    // Run each baseConstructor with/without forceExtendableShardSpecs.
    final List<Object[]> constructors = new ArrayList<>();
    for (Object[] baseConstructor : baseConstructors) {
      for (int forceExtendableShardSpecs = 0; forceExtendableShardSpecs < 2; forceExtendableShardSpecs++) {
        final Object[] fullConstructor = new Object[baseConstructor.length + 1];
        System.arraycopy(baseConstructor, 0, fullConstructor, 0, baseConstructor.length);
        fullConstructor[baseConstructor.length] = forceExtendableShardSpecs == 0;
        constructors.add(fullConstructor);
      }
    }

    return constructors;
  }

  @Rule
  public final TemporaryFolder temporaryFolder = new TemporaryFolder();

  private final boolean useCombiner;
  private final String partitionType;
  private final Interval interval;
  private final Object[][][] shardInfoForEachSegment;
  private final List<String> data;
  private final String inputFormatName;
  private final InputRowParser inputRowParser;
  private final Integer maxRowsInMemory;
  private final Long maxBytesInMemory;
  private final AggregatorFactory[] aggs;
  private final String datasourceName;
  private final boolean forceExtendableShardSpecs;

  private ObjectMapper mapper;
  private HadoopDruidIndexerConfig config;
  private File dataFile;
  private File tmpDir;

  public IndexGeneratorJobTest(
      boolean useCombiner,
      String partitionType,
      String interval,
      Object[][][] shardInfoForEachSegment,
      List<String> data,
      String inputFormatName,
      InputRowParser inputRowParser,
      Integer maxRowsInMemory,
      Long maxBytesInMemory,
      AggregatorFactory[] aggs,
      String datasourceName,
      boolean forceExtendableShardSpecs
  )
  {
    this.useCombiner = useCombiner;
    this.partitionType = partitionType;
    this.shardInfoForEachSegment = shardInfoForEachSegment;
    this.interval = Intervals.of(interval);
    this.data = data;
    this.inputFormatName = inputFormatName;
    this.inputRowParser = inputRowParser;
    this.maxRowsInMemory = maxRowsInMemory;
    this.maxBytesInMemory = maxBytesInMemory;
    this.aggs = aggs;
    this.datasourceName = datasourceName;
    this.forceExtendableShardSpecs = forceExtendableShardSpecs;
  }

  private void writeDataToLocalSequenceFile(File outputFile, List<String> data) throws IOException
  {
    Configuration conf = new Configuration();
    LocalFileSystem fs = FileSystem.getLocal(conf);
    Writer fileWriter = SequenceFile.createWriter(
        fs,
        conf,
        new Path(outputFile.getAbsolutePath()),
        BytesWritable.class,
        BytesWritable.class,
        SequenceFile.CompressionType.NONE,
        (CompressionCodec) null
    );

    int keyCount = 10;
    for (String line : data) {
      ByteBuffer buf = ByteBuffer.allocate(4);
      buf.putInt(keyCount);
      BytesWritable key = new BytesWritable(buf.array());
      BytesWritable value = new BytesWritable(StringUtils.toUtf8(line));
      fileWriter.append(key, value);
      keyCount += 1;
    }

    fileWriter.close();
  }

  @Before
  public void setUp() throws Exception
  {
    mapper = HadoopDruidIndexerConfig.JSON_MAPPER;
    mapper.registerSubtypes(new NamedType(HashBasedNumberedShardSpec.class, "hashed"));
    mapper.registerSubtypes(new NamedType(SingleDimensionShardSpec.class, "single"));

    dataFile = temporaryFolder.newFile();
    tmpDir = temporaryFolder.newFolder();

    HashMap<String, Object> inputSpec = new HashMap<>();
    inputSpec.put("paths", dataFile.getCanonicalPath());
    inputSpec.put("type", "static");
    if (inputFormatName != null) {
      inputSpec.put("inputFormat", inputFormatName);
    }

    if (SequenceFileInputFormat.class.getName().equals(inputFormatName)) {
      writeDataToLocalSequenceFile(dataFile, data);
    } else {
      FileUtils.writeLines(dataFile, data);
    }

    config = new HadoopDruidIndexerConfig(
        new HadoopIngestionSpec(
            DataSchema.builder()
                      .withDataSource(datasourceName)
                      .withParserMap(mapper.convertValue(inputRowParser, Map.class))
                      .withAggregators(aggs)
                      .withGranularity(
                          new UniformGranularitySpec(
                              Granularities.DAY,
                              Granularities.NONE,
                              ImmutableList.of(interval)
                          )
                      )
                      .withObjectMapper(mapper)
                      .build(),
            new HadoopIOConfig(
                ImmutableMap.copyOf(inputSpec),
                null,
                tmpDir.getCanonicalPath()
            ),
            new HadoopTuningConfig(
                tmpDir.getCanonicalPath(),
                null,
                null,
                null,
                null,
                null,
                null,
                maxRowsInMemory,
                maxBytesInMemory,
                true,
                false,
                false,
                false,
                ImmutableMap.of(MRJobConfig.NUM_REDUCES, "0"), //verifies that set num reducers is ignored
                false,
                useCombiner,
                null,
                null,
                forceExtendableShardSpecs,
                false,
                null,
                null,
                null,
                null,
                null,
                1
            )
        )
    );

    config.setShardSpecs(loadShardSpecs(partitionType, shardInfoForEachSegment));
    config = HadoopDruidIndexerConfig.fromSpec(config.getSchema());
  }

  private List<ShardSpec> constructShardSpecFromShardInfo(String partitionType, Object[][] shardInfoForEachShard)
  {
    List<ShardSpec> specs = new ArrayList<>();
    if ("hashed".equals(partitionType)) {
      for (Integer[] shardInfo : (Integer[][]) shardInfoForEachShard) {
        specs.add(
            new HashBasedNumberedShardSpec(
                shardInfo[0],
                shardInfo[1],
                shardInfo[0],
                shardInfo[1],
                null,
                HashPartitionFunction.MURMUR3_32_ABS,
                HadoopDruidIndexerConfig.JSON_MAPPER
            )
        );
      }
    } else if ("single".equals(partitionType)) {
      int partitionNum = 0;
      for (String[] shardInfo : (String[][]) shardInfoForEachShard) {
        specs.add(new SingleDimensionShardSpec(
            "host",
            shardInfo[0],
            shardInfo[1],
            partitionNum++,
            shardInfoForEachShard.length
        ));
      }
    } else {
      throw new RE("Invalid partition type:[%s]", partitionType);
    }

    return specs;
  }

  private Map<Long, List<HadoopyShardSpec>> loadShardSpecs(
      String partitionType,
      Object[][][] shardInfoForEachShard
  )
  {
    Map<Long, List<HadoopyShardSpec>> shardSpecs = new TreeMap<>(DateTimeComparator.getInstance());
    int shardCount = 0;
    int segmentNum = 0;
    for (Interval segmentGranularity : config.getSegmentGranularIntervals()) {
      List<ShardSpec> specs = constructShardSpecFromShardInfo(partitionType, shardInfoForEachShard[segmentNum++]);
      List<HadoopyShardSpec> actualSpecs = Lists.newArrayListWithExpectedSize(specs.size());
      for (ShardSpec spec : specs) {
        actualSpecs.add(new HadoopyShardSpec(spec, shardCount++));
      }

      shardSpecs.put(segmentGranularity.getStartMillis(), actualSpecs);
    }

    return shardSpecs;
  }

  @Test
  public void testIndexGeneratorJob() throws IOException
  {
    verifyJob(new IndexGeneratorJob(config));
  }

  private void verifyJob(IndexGeneratorJob job) throws IOException
  {
    Assert.assertTrue(JobHelper.runJobs(ImmutableList.of(job)));

    final Map<Interval, List<DataSegment>> intervalToSegments = new HashMap<>();
    IndexGeneratorJob
        .getPublishedSegmentAndIndexZipFilePaths(config)
        .forEach(segmentAndIndexZipFilePath -> intervalToSegments.computeIfAbsent(segmentAndIndexZipFilePath.getSegment().getInterval(), k -> new ArrayList<>())
                                              .add(segmentAndIndexZipFilePath.getSegment()));

    List<DataSegmentAndIndexZipFilePath> dataSegmentAndIndexZipFilePaths =
        IndexGeneratorJob.getPublishedSegmentAndIndexZipFilePaths(config);
    JobHelper.renameIndexFilesForSegments(config.getSchema(), dataSegmentAndIndexZipFilePaths);

    JobHelper.maybeDeleteIntermediatePath(true, config.getSchema());
    File workingPath = new File(config.makeIntermediatePath().toUri().getPath());
    Assert.assertTrue(workingPath.exists());

    final Map<Interval, List<File>> intervalToIndexFiles = new HashMap<>();
    int segmentNum = 0;
    for (DateTime currTime = interval.getStart(); currTime.isBefore(interval.getEnd()); currTime = currTime.plusDays(1)) {
      Object[][] shardInfo = shardInfoForEachSegment[segmentNum++];
      File segmentOutputFolder = new File(
          StringUtils.format(
              "%s/%s/%s_%s/%s",
              config.getSchema().getIOConfig().getSegmentOutputPath(),
              config.getSchema().getDataSchema().getDataSource(),
              currTime.toString(),
              currTime.plusDays(1).toString(),
              config.getSchema().getTuningConfig().getVersion()
          )
      );
      Assert.assertTrue(segmentOutputFolder.exists());
      Assert.assertEquals(shardInfo.length, segmentOutputFolder.list().length);

      for (int partitionNum = 0; partitionNum < shardInfo.length; ++partitionNum) {
        File individualSegmentFolder = new File(segmentOutputFolder, Integer.toString(partitionNum));
        Assert.assertTrue(individualSegmentFolder.exists());

        File indexZip = new File(individualSegmentFolder, "index.zip");
        Assert.assertTrue(indexZip.exists());

        intervalToIndexFiles.computeIfAbsent(new Interval(currTime, currTime.plusDays(1)), k -> new ArrayList<>())
                            .add(indexZip);
      }
    }

    Assert.assertEquals(intervalToSegments.size(), intervalToIndexFiles.size());

    segmentNum = 0;
    for (Entry<Interval, List<DataSegment>> entry : intervalToSegments.entrySet()) {
      final Interval interval = entry.getKey();
      final List<DataSegment> segments = entry.getValue();
      final List<File> indexFiles = intervalToIndexFiles.get(interval);
      Collections.sort(segments);
      indexFiles.sort(Comparator.comparing(File::getAbsolutePath));

      Assert.assertNotNull(indexFiles);
      Assert.assertEquals(segments.size(), indexFiles.size());

      Object[][] shardInfo = shardInfoForEachSegment[segmentNum++];

      for (int i = 0; i < segments.size(); i++) {
        final DataSegment dataSegment = segments.get(i);
        final File indexZip = indexFiles.get(i);

        Assert.assertEquals(config.getSchema().getTuningConfig().getVersion(), dataSegment.getVersion());
        Assert.assertEquals("local", dataSegment.getLoadSpec().get("type"));
        Assert.assertEquals(indexZip.getCanonicalPath(), dataSegment.getLoadSpec().get("path"));
        Assert.assertEquals(Integer.valueOf(9), dataSegment.getBinaryVersion());

        if ("website".equals(datasourceName)) {
          Assert.assertEquals("website", dataSegment.getDataSource());
          Assert.assertEquals("host", dataSegment.getDimensions().get(0));
          Assert.assertEquals("visited_num", dataSegment.getMetrics().get(0));
          Assert.assertEquals("unique_hosts", dataSegment.getMetrics().get(1));
        } else if ("inherit_dims".equals(datasourceName)) {
          Assert.assertEquals("inherit_dims", dataSegment.getDataSource());
          Assert.assertEquals(ImmutableList.of("X", "Y", "M", "Q", "B", "F"), dataSegment.getDimensions());
          Assert.assertEquals("count", dataSegment.getMetrics().get(0));
        } else if ("inherit_dims2".equals(datasourceName)) {
          Assert.assertEquals("inherit_dims2", dataSegment.getDataSource());
          Assert.assertEquals(ImmutableList.of("B", "F", "M", "Q", "X", "Y"), dataSegment.getDimensions());
          Assert.assertEquals("count", dataSegment.getMetrics().get(0));
        } else {
          Assert.fail("Test did not specify supported datasource name");
        }

        if (forceExtendableShardSpecs) {
          NumberedShardSpec spec = (NumberedShardSpec) dataSegment.getShardSpec();
          Assert.assertEquals(i, spec.getPartitionNum());
          Assert.assertEquals(shardInfo.length, spec.getNumCorePartitions());
        } else if ("hashed".equals(partitionType)) {
          Integer[] hashShardInfo = (Integer[]) shardInfo[i];
          HashBasedNumberedShardSpec spec = (HashBasedNumberedShardSpec) dataSegment.getShardSpec();
          Assert.assertEquals((int) hashShardInfo[0], spec.getPartitionNum());
          Assert.assertEquals((int) hashShardInfo[1], spec.getNumCorePartitions());
        } else if ("single".equals(partitionType)) {
          String[] singleDimensionShardInfo = (String[]) shardInfo[i];
          SingleDimensionShardSpec spec = (SingleDimensionShardSpec) dataSegment.getShardSpec();
          Assert.assertEquals(singleDimensionShardInfo[0], spec.getStart());
          Assert.assertEquals(singleDimensionShardInfo[1], spec.getEnd());
        } else {
          throw new RE("Invalid partition type:[%s]", partitionType);
        }
      }
    }
  }
}
